【CDI】Informatica Cloud Data Integrationでハンズオン課題を考えてみた
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
チームに新規参入の方が増えたので、その際のCDI ハンズオン課題をブログに残しておきたいと思います。
前提条件
- CDIを操作できる環境を構築済みであることを前提とします。
- Redshift ServerlessとS3 Bucketへの相互のロード処理を行いますので、事前にRedshift Serverless, S3 Bucket, IICSコネクタの設定をしていることとします。未設定の場合は、以下Redshift Connectorの接続ブログをご確認だください。
- 今回使用するデータとddl、csv, jsonはすべて以下Github上に格納しています。AWS公式ハンズオンのサンプルデータをダウンロードし、csvファイル形式としたものになります。ddlに関しては事前に実行してtableを作成しておいてください。
- CDIには様々な実装方法があり、必ずしも今回のハンズオン実装が正解とは限りませんのであらかじめご承知おきください。
-
参考:
課題
構成図に沿って作成していきます。
- 課題1)DWH tableへのInsert
- 課題2)DataMart tableへのUpsert
課題1
sales
table用タスクフロー作成:ユーザーはDWHsales
tableに格納するcsvファイルを手動で任意の格納先S3に配置し、それをトリガーとして起動するタスクフローを作成します。sales
マッピング :sales
tableにInsertするマッピング処理を実装します。Insertする際はupdate_at
カラムに更新日付も登録します。
event
table用タスクフロー作成:ユーザーはDWHevent
tableに格納するcsvファイルを手動で任意の格納先S3に配置し、それをトリガーとして起動するタスクフローを作成します。event
マッピング :event
tableにInsertするマッピング処理を実装します。Insertする際は、update_at
カラムに更新日付も登録します。
1回目のデータ | 2回目のデータ |
---|---|
event_data.csv | upsert_event_data.csv |
sales_data.csv | upsert_sales_data.csv |
課題2
event_sales_summary
table用タスクフロー作成:DWHへInsertする処理後にスケジュール起動するタスクフローを作成します。select_sales_last_update_time
マッピング:sales
tableの前回実行以降のデータを差分抽出するために、last_updated_time
tableからsales
のlast_updated
を抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。select_event_last_update_time
マッピング:event
tableの前回実行以降のデータを差分抽出するために、last_updated_time
tableからevent
のlast_updated
を抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。event_sales_summary
マッピング:以下のsql条件で、先程の入出力パラメータの値を用いて前回実行以降のデータを差分抽出し、DataMartのevent_sales_summary
tableへUpsertするマッピングを作成します。upsert_sales_last_update_time
マッピング: DataMart処理実行後にlast_updated_time
tableのtable_name
カラムがsales
のlast_updated
カラムをUpsertします。upsert_event_last_update_time
マッピング: DataMart処理実行後にlast_updated_time
tableのtable_name
カラムがevent
のlast_updated
カラムをUpsertします。
-- DataMart SQL WITH last_updated_sales AS ( SELECT last_updated FROM last_updated_time WHERE table_name = 'sales' ), last_updated_event AS ( SELECT last_updated FROM last_updated_time WHERE table_name = 'event' ) SELECT e.eventname, s.sellerid, SUM(s.pricepaid * s.qtysold) AS total_sales FROM sales s JOIN event e ON s.eventid = e.eventid JOIN last_updated_sales lus ON s.update_at > lus.last_updated JOIN last_updated_event lue ON e.update_at > lue.last_updated WHERE e.catid = '9' GROUP BY e.eventname, s.sellerid ORDER BY total_sales DESC;
注意点
今回の構成は、CDIでの実装ハンズオンのため、CDI上でSQL コンポーネントを使用しないように実装してみてください。以降は、実際のハンズオンとなりますので、一度時間をかけて実装してみた後に自身の実装と照らし合わせていただければと思います。
課題1のハンズオン
m_sw_sales
それでは以下のマッピングから作成していきます。
sales
マッピング :sales
tableにInsertするマッピング処理を実装します。Insertする際はupdate_at
カラムに更新日付も登録します。
事前にsales_header.csv
とsales_data .csv
配置し、不要なファイルはdoneフォルダに格納するようにS3 Bucket上で操作します。
まずはソースに先ほど格納したsales_header.csv
を選択します。
次に詳細からSource TypeをDirectory
を選択し、フォルダーパスに先ほどsales_header.csv
ファイルを格納したS3 Pathを指定します。この設定により、オブジェクトで指定したheaderのcsvファイルではなく、Folder Path直下のファイルをすべて参照することになります。これによりsales_data_yyyymmdd.csv
のような可変なcsvファイル名で来た場合でも取り込めるようになります。sales_header.csv
の役割としては、フィールド値を指定する必要があるため作成しています。なので、headerファイルの中身はカラム名のみとなっています。
次に形式オプションを開き、スキーマファイルにGithubに格納されているshm_ m_sw_sales.json
を設定します。スキーマファイルを設定することでフィールドの精度をそれぞれのカラムに合わせてカスタマイズすることができます。修飾子モードは修飾子として"
を使用したいため、ALL
を選択し、それ以外はデフォルトのままで設定しておきます。
次に式Transformationを設定します。
プラスボタンの新しいフィールドからupdate_at
カラムの値のためにnew_updt_at
フィールドを作成し、システム変数のSYSDATE
を使用します。
saletime
カラムにdatetime型として値を登録するためにnew_saletime
を以下の式の値で設定します。
IIF( IsNull(saletime), NULL, To_Date(saletime,'YYYY-MM-DD HH24:MI:SS' ))
次にターゲットを設定します。ターゲットにはdwhsales
tableを指定し、Upsertで更新カラムはsalesid
を指定します。
Redshiftへデータを取り込む際に一時的利用するS3 Bucketには任意のBucket名を記載します。
フィールドマッピングではフィールドマップオプションを手動
に設定し、自動マップからスマートマップ
を実行、実行後にsaletime
はマップ解除し、new_
の値をそれぞれ、左からドラッグ&ドロップします。
mt_sw_sales
次にマッピングタスクですが、先ほど作成したマッピングを指定し、詳細セッションプロパティで以下の値を入力し、保存したら完了となります。
セッションプロパティ名 | セッションプロパティ値 | 備考 |
---|---|---|
日時形式文字列 | YYYY-MM-DD HH24:MI:SS | datetime型の形式を指定 |
エラー時に停止 | 1 | エラー時に1であると停止する設定 |
tf_sw_sales
次にタスクフローを作成します。左側の新規ボタンからタスクフローを選択します。
データタスクで先ほど作成したマッピングタスクを指定します。
エラー処理ではエラー発生時
でカスタムエラー処理
にチェックを入れます。
エラー時に通知タスクを紐づけます。任意のメールと件名を設定します。
本文はHTML形式を選択し、任意のエラー文言と該当のデータタスクフィールドを左から選択してエラー内容を表示します。
エラー時にスローコンポーネントを紐づけて、データタスクの失敗時のコード、詳細、理由をそれぞれ選択します。
fl_tf_sw_sales
次にファイルリスナを作成します。
新規のコンポーネントからファイルリスナを選択します。 それぞれ以下の設定とします。表の設定項目以外は、デフォルトのまま使用します。
項目 | 設定 |
---|---|
ファイルリスナの名前 | fl_<タスクフロー名> |
ソースタイプ | コネクタ |
接続タイプ | Amazon S3 v2 |
ソースタイプ | コネクタ |
接続 | 事前に作成したs3 Connector |
フォルダパス | ファイルをリスニングするフォルダパス |
次にタスクフローにファイルリスナを紐づけるために、タスクフローを一度publish解除します。
解除した後に、開始プロパティからバインディングにイベント
を選択し、イベントリソース名に先ほど作成したファイルリスナ名を選択し、再びタスクフローpublishしたら完了です。
課題1のsales tableについて、それぞれ作成しました。event tableについては、全く同様の手順で作成できるため、割愛させていただきます。
課題2のハンズオン
次に課題2のハンズオンになります。以下の順番で実装していきたいと思います。
select_sales_last_update_time
マッピング:sales
tableの前回実行以降のデータを差分抽出するために、last_updated_time
tableからsales
のlast_updated
を抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。event_sales_summary
マッピング:以下のsql条件で、先程の入出力パラメータの値を用いて前回実行以降のデータを差分抽出し、DataMartのevent_sales_summary
tableへUpsertするマッピングを作成します。upsert_sales_last_update_time
マッピング: DataMart処理実行後にlast_updated_time
tableのtable_name
カラムがsales
のlast_updated
カラムをUpsertします。event_sales_summary
table用タスクフロー作成:DWHへInsertする処理後にスケジュール起動するタスクフローを作成します。
m_w_select_sales_last_update_time
それでは、last_update_time
tableからsales
tableの最終更新日付を取得するマッピングを作成します。
まずはソースにlast_update_time
tableを選択し、クエリオプションのフィルタでsales
で絞り込むように設定します。
次にevent_sales_summary
マッピングで最終更新日付を使用できるように入出力パラメータを作成します。
入出力パラメータについては以下記事を参考にしました。
【CDI】Informatica Cloud Data Integrationでターゲットファイルを作成せずに入出力パラメータに値を代入する方法
次に式をの中で新たにevent_last_updated
フィールドをdatetime型で作成します。
フィールドの式としては以下のように設定することで、先ほど作成したパラメータに最終更新日付をセットすることができます。
SetVariable($$param_sales_last_updated,last_updated)
ターゲットでは何も出力したくないので、フラットファイルコネクタでdev/null
を指定することでファイルを出力しないようにします。
マッピングタスクは課題1と同様の設定で作成すれば良いので割愛させていただきます。m_w_select_event_last_update_time
マッピングについても同様の手順となるので、割愛します。
m_wm_event_sales_summary
次にevent_sales_summary
tableへUpsertするマッピングを作成します。
まず、先ほど渡した入出力パラメータを使用するために、本マッピングでも入出力パラメータを設定します。
ソースにsales
tableを選択します。
クエリオプションのフィルタの設定で以下の条件とすることで、最終更新日付以降のデータを取得します。
sales.update_at > '$$param_sales_last_updated'
event
tableのソースも同様に設定します。
次にjoinerの設定をします。まず、受信フィールドですが一部カラム名が重複しているので一括でそれぞれのプレフィックスを追加する機能を使用してカラム名を一意にします。
sales
tableの場合はs_
、event
tableの場合は、e_
と指定しました。
結合条件としては、eventidで結合するようにします。Masterにサイズの小さいデータソースを割り当てることが推奨されていますが、今回はどちらも同じデータサイズのため、sales
をマスタに設定します。
IICS CDI Mapping Designer入門 〜Joiner(結合)編〜
次にsqlでいうwhere句の処理にあたるfilerを実装します。条件としてe_catid
が9
で絞り込みます。
次にaggregatorで集計関数を設定します。グループ化でGrop Byで指定するe_eventname
、s_sellerid
を指定します。
集計で新たにtotal_salesフィールドを定義し、Sum関数で二つの値をかけた値を格納します。
Sum(s_pricepaid * s_qtysold)
Sorterでは、SQLでいうOrder byを表現するため、total_salesフィールドの降順を設定します。
ターゲットではevent_sales_summary
tableを設定し、eventnameカラムをUpsert Keyとします。ステージング用のS3 bucketも設定します。
フィールドマッピングを以下のように設定し、本マッピングの作成は完了です。マッピングタスクについては課題1と同様の作成方法のため割愛します。
m_w_upsert_sales_last_update_time
event_sales_summary
tableへのUpsert処理後に最終更新日付を更新するマッピングを作成します。
ソースにlast_updated_time
tableを指定し、フィルタの条件でsales
tableに絞り込みます。
式ではnew_last_updated
フィールドを作成し、SYSDATEを設定します。
ターゲットでは、last_updated_time
tableを指定し、table_nameをUpsert Keyに設定します。
フィールドマッピングに先ほど作成したフィールドを設定し、本マッピングの作成は完了です。マッピングタスクについては課題1と同様の作成方法のため割愛します。
tf_wm_event_sales_summary
タスクフローを作成します。まず、並列パスを設定し、mt_w_select_event_last_update_time
とmt_w_select_sales_last_update_time
を設定するデータタスクを作成します。
次にmt_wm_event_sales_summary
を設定するデータタスクを作成します。本データタスクの入力フィールドを設定し、先ほどのデータタスクの入出力パラメータを値として設定するようにします。
並列パスを設定し、mt_w_upsert_event_last_update_time
とmt_w_upsert_sales_last_update_time
を設定するデータタスクを作成します。
sc_tf_wm_event_sales_summary
最後にスケジュール設定を行います。
作成したタスクフローを右クリックし、スケジュールを選択します。
ここではジョブ名として、job_<タスクフロー>
と入力し、新しいスケジュールを選択します。
名前はsc_<タスクフロー>
と入力し、スケジュール時間を人二の値に設定します。
最後にスケジュールの割り当てを押下したら完了となります。
初回実行
それでは初回実行となりますが、初回実行の前に、last_updated_time tableに初期値が必要ですので、以下のInsert文を実行します。
INSERT INTO last_updated_time (table_name, last_updated) VALUES ('event', '2000-01-01 00:00:00'), ('sales', '2000-01-01 00:00:00');
それでは、ファイルリスナが起動した状態に設定し、S3 Bucketに対象ファイルを格納します。
ファイル転送ログからファイルリスナがそれぞれのファイルを検知していることを確認できました。
ファイルリスナ起動で実行された実行結果になります。共に正常に終了されていることを確認できました。
Redshift上で確認しても100件取り込めていることがわかります。
この状態で、MartにInsertするSQLを実行し、IICSでの処理後にMartがどの様なレコードが入るかを確認します。 全5レコードとなります。
次にMart処理を実行します。Martはスケジュール起動のため、任意の時間に設定します。
タスクフローが問題なく成功していることを確認しました。
Redshiftのevent_sales_summary
tableを確認しましたが、件数もレコードの値も先ほどの内容と一致したので、想定通りの結果となりました。
last_updated_time
tableのlast_updated
カラムも更新されていることが確認できました。
差分実行
続いて差分処理を確認していきます。 まずは先ほどと同様に差分用のcsvファイルを格納しDWH処理を実行します。
タスクフローが成功しました。
Redshiftにも3件登録されていることがわかります。
先ほどと同様に、MartにInsertするSQLを実行し、IICSでの処理後にMartがどの様なレコードが入るかを確認します。 実行結果は以下2レコードとなりました。
スケジュールでタスクフローを実行したところ、正常に終了しました。
SQLと同様に追加の2件がInsertされていることを確認できたので、問題なさそうですね。
最後に
どうしても画面操作が多く、画像での説明が多くなってしまいましたが、お付き合いいただきありがとうございました。 その他にもIICSのハンズオンや参考記事として、以下も確認いただければより理解が深まると思います。 お疲れ様でした!!